% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

-module(couch_peruser).
-behaviour(gen_server).
-behaviour(mem3_cluster).

-include_lib("couch/include/couch_db.hrl").
-include_lib("mem3/include/mem3.hrl").

% gen_server callbacks
-export([
    start_link/0,
    init/1,
    handle_call/3,
    handle_cast/2,
    handle_info/2
]).

-export([init_changes_handler/1, changes_handler/3]).

% mem3_cluster callbacks
-export([
    cluster_stable/1,
    cluster_unstable/1
]).

-record(changes_state, {
    parent :: pid(),
    db_name :: binary(),
    delete_dbs :: boolean(),
    changes_pid :: pid(),
    changes_ref :: reference(),
    q_for_peruser_db :: integer(),
    peruser_dbname_prefix :: binary()
}).

-record(state, {
    parent :: pid(),
    db_name :: binary(),
    delete_dbs :: boolean(),
    states :: list(),
    mem3_cluster_pid :: pid(),
    cluster_stable :: boolean(),
    q_for_peruser_db :: integer(),
    peruser_dbname_prefix :: binary()
}).

-define(DEFAULT_USERDB_PREFIX, "userdb-").
-define(RELISTEN_DELAY, 5000).
% seconds
-define(DEFAULT_QUIET_PERIOD, 60).
% seconds
-define(DEFAULT_START_PERIOD, 5).

%%
%% Please leave in the commented-out couch_log:debug calls, thanks! — Jan
%%
-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
start_link() ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

-spec init_state() -> #state{}.
init_state() ->
    couch_log:debug("peruser: starting on node ~p in pid ~p", [node(), self()]),
    case config:get_boolean("couch_peruser", "enable", false) of
        false ->
            couch_log:debug("peruser: disabled on node ~p", [node()]),
            #state{};
        true ->
            couch_log:debug("peruser: enabled on node ~p", [node()]),
            DbName = ?l2b(
                config:get(
                    "couch_httpd_auth", "authentication_db", "_users"
                )
            ),
            ioq:set_io_priority({system, DbName}),
            DeleteDbs = config:get_boolean("couch_peruser", "delete_dbs", false),
            Q = config:get_integer("couch_peruser", "q", 1),
            Prefix = config:get("couch_peruser", "database_prefix", ?DEFAULT_USERDB_PREFIX),
            case couch_db:validate_dbname(Prefix) of
                ok ->
                    ok;
                Error ->
                    couch_log:error(
                        "couch_peruser can't proceed as illegal database prefix ~p.\n"
                        "                    Error: ~p",
                        [Prefix, Error]
                    ),
                    throw(Error)
            end,

            % set up cluster-stable listener
            Period = abs(
                config:get_integer(
                    "couch_peruser",
                    "cluster_quiet_period",
                    ?DEFAULT_QUIET_PERIOD
                )
            ),
            StartPeriod = abs(
                config:get_integer(
                    "couch_peruser",
                    "cluster_start_period",
                    ?DEFAULT_START_PERIOD
                )
            ),

            {ok, Mem3Cluster} = mem3_cluster:start_link(
                ?MODULE,
                self(),
                StartPeriod,
                Period
            ),

            #state{
                parent = self(),
                db_name = DbName,
                delete_dbs = DeleteDbs,
                mem3_cluster_pid = Mem3Cluster,
                cluster_stable = false,
                q_for_peruser_db = Q,
                peruser_dbname_prefix = ?l2b(Prefix)
            }
    end.

-spec start_listening(State :: #state{}) -> #state{} | ok.
start_listening(#state{states = ChangesStates} = State) when
    length(ChangesStates) > 0
->
    % couch_log:debug("peruser: start_listening() already run on node ~p in pid ~p", [node(), self()]),
    State;
start_listening(
    #state{
        db_name = DbName,
        delete_dbs = DeleteDbs,
        q_for_peruser_db = Q,
        peruser_dbname_prefix = Prefix
    } = State
) ->
    % couch_log:debug("peruser: start_listening() on node ~p", [node()]),
    try
        States = lists:map(
            fun(A) ->
                S = #changes_state{
                    parent = State#state.parent,
                    db_name = A#shard.name,
                    delete_dbs = DeleteDbs,
                    q_for_peruser_db = Q,
                    peruser_dbname_prefix = Prefix
                },
                {Pid, Ref} = spawn_opt(
                    ?MODULE, init_changes_handler, [S], [link, monitor]
                ),
                S#changes_state{changes_pid = Pid, changes_ref = Ref}
            end,
            mem3:local_shards(DbName)
        ),
        % couch_log:debug("peruser: start_listening() States ~p", [States]),

        State#state{states = States, cluster_stable = true}
    catch
        error:database_does_not_exist ->
            couch_log:warning(
                "couch_peruser can't proceed as underlying database (~s) is missing, disables itself.",
                [DbName]
            ),
            config:set(
                "couch_peruser",
                "enable",
                "false",
                lists:concat([binary_to_list(DbName), " is missing"])
            )
    end.

-spec init_changes_handler(ChangesState :: #changes_state{}) -> ok.
init_changes_handler(#changes_state{db_name = DbName} = ChangesState) ->
    % couch_log:debug("peruser: init_changes_handler() on DbName ~p", [DbName]),
    ioq:set_io_priority({system, DbName}),
    try
        {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX, sys_db]),
        FunAcc = {fun ?MODULE:changes_handler/3, ChangesState},
        (couch_changes:handle_db_changes(
            #changes_args{feed = "continuous", timeout = infinity},
            {json_req, null},
            Db
        ))(
            FunAcc
        )
    catch
        error:database_does_not_exist ->
            ok
    end.

-type db_change() :: {atom(), tuple(), binary()}.
-spec changes_handler(
    Change :: db_change(),
    ResultType :: any(),
    ChangesState :: #changes_state{}
) -> #changes_state{}.
changes_handler(
    {change, {Doc}, _Prepend},
    _ResType,
    ChangesState = #changes_state{
        db_name = DbName,
        q_for_peruser_db = Q,
        peruser_dbname_prefix = Prefix
    }
) ->
    % couch_log:debug("peruser: changes_handler() on DbName/Doc ~p/~p", [DbName, Doc]),

    case couch_util:get_value(<<"id">>, Doc) of
        <<"org.couchdb.user:", User/binary>> = DocId ->
            case should_handle_doc(DbName, DocId) of
                true ->
                    case couch_util:get_value(<<"deleted">>, Doc, false) of
                        false ->
                            UserDb = ensure_user_db(Prefix, User, Q),
                            ok = ensure_security(User, UserDb, fun add_user/3),
                            ChangesState;
                        true ->
                            case ChangesState#changes_state.delete_dbs of
                                true ->
                                    _UserDb = delete_user_db(Prefix, User),
                                    ChangesState;
                                false ->
                                    UserDb = user_db_name(Prefix, User),
                                    ok = ensure_security(User, UserDb, fun remove_user/3),
                                    ChangesState
                            end
                    end;
                false ->
                    ChangesState
            end;
        _ ->
            ChangesState
    end;
changes_handler(_Event, _ResType, ChangesState) ->
    ChangesState.

-spec should_handle_doc(ShardName :: binary(), DocId :: binary()) -> boolean().
should_handle_doc(ShardName, DocId) ->
    case is_stable() of
        false ->
            % when the cluster is unstable, we have already stopped all Listeners
            % the next stable event will restart all listeners and pick up this
            % doc change
            couch_log:debug(
                "peruser: skipping, cluster unstable ~s/~s",
                [ShardName, DocId]
            ),
            false;
        true ->
            should_handle_doc_int(ShardName, DocId)
    end.

-spec should_handle_doc_int(
    ShardName :: binary(),
    DocId :: binary()
) -> boolean().
should_handle_doc_int(ShardName, DocId) ->
    DbName = mem3:dbname(ShardName),
    Live = [erlang:node() | erlang:nodes()],
    Shards = mem3:shards(DbName, DocId),
    Nodes = [N || #shard{node = N} <- Shards, lists:member(N, Live)],
    case mem3:owner(DbName, DocId, Nodes) of
        ThisNode when ThisNode =:= node() ->
            couch_log:debug("peruser: handling ~s/~s", [DbName, DocId]),
            % do the database action
            true;
        _OtherNode ->
            couch_log:debug("peruser: skipping ~s/~s", [DbName, DocId]),
            false
    end.

-spec delete_user_db(Prefix :: binary(), User :: binary()) -> binary().
delete_user_db(Prefix, User) ->
    UserDb = user_db_name(Prefix, User),
    try
        case fabric:delete_db(UserDb, [?ADMIN_CTX]) of
            ok -> ok;
            accepted -> ok
        end
    catch
        error:database_does_not_exist ->
            ok
    end,
    UserDb.

-spec ensure_user_db(Prefix :: binary(), User :: binary(), Q :: integer()) -> binary().
ensure_user_db(Prefix, User, Q) ->
    UserDb = user_db_name(Prefix, User),
    try
        {ok, _DbInfo} = fabric:get_db_info(UserDb)
    catch
        error:database_does_not_exist ->
            case fabric:create_db(UserDb, [?ADMIN_CTX, {q, integer_to_list(Q)}]) of
                {error, file_exists} -> ok;
                ok -> ok;
                accepted -> ok
            end
    end,
    UserDb.

-spec add_user(
    User :: binary(),
    Properties :: tuple(),
    Acc :: tuple()
) -> tuple().
add_user(User, Prop, {Modified, SecProps}) ->
    {PropValue} = couch_util:get_value(Prop, SecProps, {[]}),
    Names = couch_util:get_value(<<"names">>, PropValue, []),
    case lists:member(User, Names) of
        true ->
            {Modified, SecProps};
        false ->
            {true,
                lists:keystore(
                    Prop,
                    1,
                    SecProps,
                    {Prop,
                        {lists:keystore(
                            <<"names">>,
                            1,
                            PropValue,
                            {<<"names">>, [User | Names]}
                        )}}
                )}
    end.

-spec remove_user(
    User :: binary(),
    Properties :: tuple(),
    Acc :: tuple()
) -> tuple().
remove_user(User, Prop, {Modified, SecProps}) ->
    {PropValue} = couch_util:get_value(Prop, SecProps, {[]}),
    Names = couch_util:get_value(<<"names">>, PropValue, []),
    case lists:member(User, Names) of
        false ->
            {Modified, SecProps};
        true ->
            {true,
                lists:keystore(
                    Prop,
                    1,
                    SecProps,
                    {Prop,
                        {lists:keystore(
                            <<"names">>,
                            1,
                            PropValue,
                            {<<"names">>, lists:delete(User, Names)}
                        )}}
                )}
    end.

-spec ensure_security(
    User :: binary(),
    UserDb :: binary(),
    TransformFun :: fun()
) -> ok.
ensure_security(User, UserDb, TransformFun) ->
    case fabric:get_all_security(UserDb, [?ADMIN_CTX]) of
        {error, no_majority} ->
            % TODO: make sure this is still true: single node, ignore
            ok;
        {ok, Shards} ->
            {_ShardInfo, {SecProps}} = hd(Shards),
            % assert that shards have the same security object
            true = lists:all(
                fun({_, {SecProps1}}) ->
                    SecProps =:= SecProps1
                end,
                Shards
            ),
            case
                lists:foldl(
                    fun(Prop, SAcc) -> TransformFun(User, Prop, SAcc) end,
                    {false, SecProps},
                    [<<"admins">>, <<"members">>]
                )
            of
                {false, _} ->
                    ok;
                {true, SecProps1} ->
                    ok = fabric:set_security(UserDb, {SecProps1}, [?ADMIN_CTX])
            end
    end.

-spec user_db_name(Prefix :: binary(), User :: binary()) -> binary().
user_db_name(Prefix, User) ->
    HexUser = list_to_binary(
        [string:to_lower(integer_to_list(X, 16)) || <<X>> <= User]
    ),
    <<Prefix/binary, HexUser/binary>>.

-spec exit_changes(State :: #state{}) -> ok.
exit_changes(State) ->
    lists:foreach(
        fun(ChangesState) ->
            demonitor(ChangesState#changes_state.changes_ref, [flush]),
            unlink(ChangesState#changes_state.changes_pid),
            exit(ChangesState#changes_state.changes_pid, kill)
        end,
        State#state.states
    ).

-spec is_stable() -> true | false.
is_stable() ->
    gen_server:call(?MODULE, is_stable).

-spec subscribe_for_changes() -> ok.
subscribe_for_changes() ->
    config:subscribe_for_changes([
        {"couch_httpd_auth", "authentication_db"},
        "couch_peruser"
    ]).

% Mem3 cluster callbacks

% TODO: find out what type Server is
-spec cluster_unstable(Server :: any()) -> any().
cluster_unstable(Server) ->
    gen_server:cast(Server, cluster_unstable),
    Server.

% TODO: find out what type Server is
-spec cluster_stable(Server :: any()) -> any().
cluster_stable(Server) ->
    gen_server:cast(Server, cluster_stable),
    Server.

%% gen_server callbacks
-spec init(Options :: list()) -> {ok, #state{}}.
init([]) ->
    ok = subscribe_for_changes(),
    {ok, init_state()}.

handle_call(is_stable, _From, #state{cluster_stable = IsStable} = State) ->
    {reply, IsStable, State};
handle_call(_Msg, _From, State) ->
    {reply, error, State}.

handle_cast(update_config, State) when State#state.states =/= undefined ->
    exit_changes(State),
    {noreply, init_state()};
handle_cast(update_config, _) ->
    {noreply, init_state()};
handle_cast(stop, State) ->
    {stop, normal, State};
handle_cast(cluster_unstable, State) when State#state.states =/= undefined ->
    exit_changes(State),
    {noreply, init_state()};
handle_cast(cluster_unstable, _) ->
    {noreply, init_state()};
handle_cast(cluster_stable, State) ->
    {noreply, start_listening(State)};
handle_cast(_Msg, State) ->
    {noreply, State}.

handle_info({'DOWN', _Ref, _, _, _Reason}, State) ->
    {stop, normal, State};
handle_info({config_change, "couch_peruser", _, _, _}, State) ->
    handle_cast(update_config, State);
handle_info(
    {
        config_change,
        "couch_httpd_auth",
        "authentication_db",
        _,
        _
    },
    State
) ->
    handle_cast(update_config, State);
handle_info({gen_event_EXIT, _Handler, _Reason}, State) ->
    erlang:send_after(?RELISTEN_DELAY, self(), restart_config_listener),
    {noreply, State};
handle_info({'EXIT', _Pid, _Reason}, State) ->
    erlang:send_after(?RELISTEN_DELAY, self(), restart_config_listener),
    {noreply, State};
handle_info(restart_config_listener, State) ->
    ok = subscribe_for_changes(),
    {noreply, State};
handle_info(_Msg, State) ->
    {noreply, State}.
